CREATE SCHEMA split_shard_replication_setup_schema;
SET search_path TO split_shard_replication_setup_schema;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 1;
SET citus.next_shard_id TO 1;

SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset

CREATE OR REPLACE FUNCTION wait_for_expected_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$
DECLARE
actualCount integer;
BEGIN
    EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount;
    WHILE  expectedCount != actualCount LOOP
	 EXECUTE FORMAT('SELECT COUNT(*) FROM %s', tableName) INTO actualCount;
    END LOOP;
END$$ LANGUAGE plpgsql;

CREATE OR REPLACE FUNCTION wait_for_updated_rowcount_at_table(tableName text, expectedCount integer) RETURNS void AS $$
DECLARE
actualCount integer;
BEGIN
    EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount;
    WHILE  expectedCount != actualCount LOOP
    EXECUTE FORMAT($query$SELECT COUNT(*) FROM %s WHERE value='b'$query$, tableName) INTO actualCount;
    END LOOP;
END$$ LANGUAGE plpgsql;

-- Create distributed table (non co-located)
CREATE TABLE table_to_split (id bigserial PRIMARY KEY, value char);
SELECT create_distributed_table('table_to_split','id');

-- Test scenario one starts from here
-- 1. table_to_split is a citus distributed table
-- 2. Shard table_to_split_1 is located on worker1.
-- 3. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
--    table_to_split_2/3 are located on worker2
-- 4. execute UDF split_shard_replication_setup on worker1 with below
--    params:
--    worker_split_shard_replication_setup
--        (
--          ARRAY[
--                ROW(1 /*source shardId */, 2 /* new shardId */,-2147483648 /* minHashValue */, -1 /* maxHasValue */ , 18 /* nodeId where new shard is placed */ ),
--                ROW(1, 3 , 0 , 2147483647, 18 )
--               ]
--         );
-- 5. Create Replication slot with 'citus'
-- 6. Setup Pub/Sub
-- 7. Insert into table_to_split_1 at source worker1
-- 8. Expect the results in either table_to_split_2 or table_to_split_3 at worker2

\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_to_split_1(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);

-- Create dummy shard tables(table_to_split_2/3b) at worker1
-- This is needed for Pub/Sub framework to work.
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_to_split_2(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_to_split_3(id bigserial PRIMARY KEY, value char);

-- Create publication at worker1
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
RESET citus.enable_ddl_propagation;

SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
    ROW(1, 'id', 2, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
    ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
    ], 0);

-- we create replication slots with a name including the next_operation_id as a suffix
-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command
SHOW citus.next_operation_id;

SELECT slot_name FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset

-- Create subscription at worker2 with copy_data to 'false' and derived replication slot name
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;

CREATE SUBSCRIPTION sub1
        CONNECTION 'host=localhost port=57637 user=postgres dbname=regression'
        PUBLICATION pub1
               WITH (
                   create_slot=false,
                   enabled=true,
                   slot_name=:slot_name,
                   copy_data=false);

-- No data is present at this moment in all the below tables at worker2
SELECT * FROM table_to_split_1;
SELECT * FROM table_to_split_2;
SELECT * FROM table_to_split_3;

-- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT INTO table_to_split_1 values(100, 'a');
INSERT INTO table_to_split_1 values(400, 'a');
INSERT INTO table_to_split_1 values(500, 'a');

SELECT * FROM table_to_split_1;
SELECT * FROM table_to_split_2;
SELECT * FROM table_to_split_3;


-- Expect data to be present in shard 2 and shard 3 based on the hash value.
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * FROM table_to_split_1; -- should alwasy have zero rows

SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1);
SELECT * FROM table_to_split_2;

SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2);
SELECT * FROM table_to_split_3;

-- UPDATE data of table_to_split_1 from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
UPDATE table_to_split_1 SET value='b' WHERE id = 100;
UPDATE table_to_split_1 SET value='b' WHERE id = 400;
UPDATE table_to_split_1 SET value='b' WHERE id = 500;

\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * FROM table_to_split_1;

-- Value should be updated in table_to_split_2;
SELECT wait_for_updated_rowcount_at_table('table_to_split_2', 1);
SELECT * FROM table_to_split_2;

-- Value should be updated in table_to_split_3;
SELECT wait_for_updated_rowcount_at_table('table_to_split_3', 2);
SELECT * FROM table_to_split_3;

\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
DELETE FROM table_to_split_1;

-- Child shard rows should be deleted
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;

SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0);
SELECT * FROM table_to_split_1;

SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0);
SELECT * FROM table_to_split_2;

SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0);
SELECT * FROM table_to_split_3;

 -- drop publication from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
DROP PUBLICATION pub1;

\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub1;

\c - - - :master_port
-- Test scenario (Parent and one child on same node. Other child on different node)
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- 2. table_to_split_1 is located on worker1.
-- 3. table_to_split_2 is located on worker1 and table_to_split_3 is located on worker2
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset

\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;

-- Create publication at worker1
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 FOR TABLE table_to_split_1, table_to_split_2, table_to_split_3;
RESET citus.enable_ddl_propagation;

SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
    ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
    ROW(1, 'id', 3, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
    ], 0);

-- we create replication slots with a name including the next_operation_id as a suffix
-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command
SHOW citus.next_operation_id;

SELECT slot_name AS slot_for_worker1 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset
SELECT slot_name AS slot_for_worker2 FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_2_node), 'citus') \gset

-- Create subscription at worker1 with copy_data to 'false' and 'slot_for_worker1'
CREATE SUBSCRIPTION sub_worker1
        CONNECTION 'host=localhost port=57637 user=postgres dbname=regression'
        PUBLICATION pub1
               WITH (
                   create_slot=false,
                   enabled=true,
                   slot_name=:slot_for_worker1,
                   copy_data=false);

\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;

-- Create subscription at worker2 with copy_data to 'false' and 'slot_for_worker2'
CREATE SUBSCRIPTION sub_worker2
        CONNECTION 'host=localhost port=57637 user=postgres dbname=regression'
        PUBLICATION pub1
               WITH (
                   create_slot=false,
                   enabled=true,
                   slot_name=:slot_for_worker2,
                   copy_data=false);

-- No data is present at this moment in all the below tables at worker2
SELECT * FROM table_to_split_1;
SELECT * FROM table_to_split_2;
SELECT * FROM table_to_split_3;

-- Insert data in table_to_split_1 at worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT INTO table_to_split_1 VALUES(100, 'a');
INSERT INTO table_to_split_1 VALUES(400, 'a');
INSERT INTO table_to_split_1 VALUES(500, 'a');
UPDATE table_to_split_1 SET value='b' WHERE id = 400;
SELECT * FROM table_to_split_1;

-- expect data to present in table_to_split_2 on worker1 as its destination for value '400'
SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1);
SELECT id FROM table_to_split_2;
SELECT * FROM table_to_split_3;

-- Expect data to be present only in table_to_split3 on worker2
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * FROM table_to_split_1;
SELECT * FROM table_to_split_2;

SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2);
SELECT * FROM table_to_split_3;

-- delete all from table_to_split_1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
DELETE FROM table_to_split_1;

-- rows from table_to_split_2 should be deleted
SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0);
SELECT * FROM table_to_split_2;

-- rows from table_to_split_3 should be deleted
\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;

SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0);
SELECT * FROM table_to_split_3;

\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub_worker2;

 -- drop publication from worker1
\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub_worker1;
DROP PUBLICATION pub1;

\c - - - :master_port
-- Test scenario (parent shard and child shards are located on same machine)
-- 1. table_to_split_1 is split into table_to_split_2 and table_to_split_3.
-- 2. table_to_split_1 is located on worker1.
-- 3. table_to_split_2 and table_to_split_3 are located on worker1
SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset

\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;

-- Create publication at worker1
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 for table table_to_split_1, table_to_split_2, table_to_split_3;
RESET citus.enable_ddl_propagation;

-- Worker1 is target for table_to_split_2 and table_to_split_3
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
    ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
    ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
    ], 0);

-- we create replication slots with a name including the next_operation_id as a suffix
-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command
SHOW citus.next_operation_id;

SELECT slot_name AS local_slot FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_10_0', :worker_1_node), 'citus') \gset

-- Create subscription at worker1 with copy_data to 'false' a
BEGIN;
CREATE SUBSCRIPTION local_subscription
        CONNECTION 'host=localhost port=57637 user=postgres dbname=regression'
        PUBLICATION pub1
               WITH (
                   create_slot=false,
                   enabled=true,
                   slot_name=:local_slot,
                   copy_data=false);
COMMIT;

INSERT INTO table_to_split_1 VALUES(100, 'a');
INSERT INTO table_to_split_1 VALUES(400, 'a');
INSERT INTO table_to_split_1 VALUES(500, 'a');

-- expect data to present in  table_to_split_2/3 on worker1
SELECT * FROM table_to_split_1;

SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 1);
SELECT * FROM table_to_split_2;

SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 2);
SELECT * FROM table_to_split_3;

DELETE FROM table_to_split_1;

SELECT wait_for_expected_rowcount_at_table('table_to_split_1', 0);
SELECT * FROM table_to_split_1;

SELECT wait_for_expected_rowcount_at_table('table_to_split_2', 0);
SELECT * FROM table_to_split_2;

SELECT wait_for_expected_rowcount_at_table('table_to_split_3', 0);
SELECT * FROM table_to_split_3;

-- clean up
DROP SUBSCRIPTION local_subscription;
DROP PUBLICATION pub1;

\c - - - :master_port
CREATE USER myuser;
CREATE USER admin_user;

GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to myuser;
GRANT USAGE, CREATE ON SCHEMA split_shard_replication_setup_schema, public to admin_user;

SET search_path TO split_shard_replication_setup_schema;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 1;
SET citus.next_shard_id TO 4;

SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset

\c - myuser - -
SET search_path TO split_shard_replication_setup_schema;
SET citus.shard_replication_factor TO 1;
SET citus.shard_count TO 1;
SET citus.next_shard_id TO 4;
CREATE TABLE table_first (id bigserial PRIMARY KEY, value char);
SELECT create_distributed_table('table_first','id');

\c - admin_user - -
SET search_path TO split_shard_replication_setup_schema;
SET citus.next_shard_id TO 7;
SET citus.shard_count TO 1;
CREATE TABLE table_second (id bigserial PRIMARY KEY, value char);
SELECT create_distributed_table('table_second', 'id', colocate_with => 'table_first');

\c - myuser - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char);

\c - myuser - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_first_4(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_first_5(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_first_6(id bigserial PRIMARY KEY, value char);

\c - admin_user - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);

\c - admin_user - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
CREATE TABLE table_second_7(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_second_8(id bigserial PRIMARY KEY, value char);
CREATE TABLE table_second_9(id bigserial PRIMARY KEY, value char);

--- Test scenario one starts from here
--- 1. table_first and table_second are colocated tables.
--- 2. myuser is the owner table_first and admin_user is the owner of table_second.
--- 3. Shard table_first_4 and table_second_7 are colocated on worker1
--- 4. table_first_4 is split into table_first_5 and table_first_6 with target as worker2
--- 5. table_second_7 is split into table_second_8 and table_second_9 with target as worker2
--- 6. Create two publishers and two subscribers for respective table owners.
--- 7. Insert into table_first_4 and table_second_7 at source worker1
--- 8. Expect the results in child shards on worker2

-- Create publication at worker1
\c - postgres - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET citus.enable_ddl_propagation TO off;
CREATE PUBLICATION pub1 FOR TABLE table_first_4, table_first_5, table_first_6;
CREATE PUBLICATION pub2 FOR TABLE table_second_7, table_second_8, table_second_9;
RESET citus.enable_ddl_propagation;

SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
    ROW(4, 'id', 5, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
    ROW(4, 'id', 6, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info,
    ROW(7, 'id', 8, '-2147483648', '-1', :worker_2_node)::pg_catalog.split_shard_info,
    ROW(7, 'id', 9, '0', '2147483647', :worker_2_node)::pg_catalog.split_shard_info
    ], 0);

SELECT relowner AS table_owner_one FROM pg_class WHERE relname='table_first' \gset
SELECT relowner AS table_owner_two FROM pg_class WHERE relname='table_second' \gset

-- we create replication slots with a name including the next_operation_id as a suffix
-- if this test file fails, make sure you compare the next_operation_id output to the object name in the next command
SHOW citus.next_operation_id;

SELECT slot_name AS slot_for_first_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_one), 'citus') \gset

SELECT slot_name AS slot_for_second_owner FROM pg_create_logical_replication_slot(FORMAT('citus_shard_split_slot_%s_%s_0', :worker_2_node, :table_owner_two), 'citus') \gset

-- Create subscription at worker2 with copy_data to 'false'
\c - postgres - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
CREATE SUBSCRIPTION sub1
        CONNECTION 'host=localhost port=57637 user=postgres dbname=regression'
        PUBLICATION pub1
               WITH (
                   create_slot=false,
                   enabled=true,
                   slot_name=:slot_for_first_owner,
                   copy_data=false);

\c - myuser - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT INTO table_first_4 VALUES(100, 'a');
INSERT INTO table_first_4 VALUES(400, 'a');
INSERT INTO table_first_4 VALUES(500, 'a');

SELECT wait_for_expected_rowcount_at_table('table_first_4', 3);
SELECT * FROM table_first_4;

\c - admin_user - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
INSERT INTO table_second_7 VALUES(100, 'a');
INSERT INTO table_second_7 VALUES(400, 'a');

SELECT wait_for_expected_rowcount_at_table('table_second_7', 2);
SELECT * FROM table_second_7;

-- expect data in table_first_5/6
\c - myuser - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * FROM table_first_4;

SELECT wait_for_expected_rowcount_at_table('table_first_5', 1);
SELECT * FROM table_first_5;

SELECT wait_for_expected_rowcount_at_table('table_first_6', 2);
SELECT * FROM table_first_6;

-- should have zero rows in all the below tables as the subscription is not yet created for admin_user
\c - admin_user - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * FROM table_second_7;
SELECT * FROM table_second_8;
SELECT * FROM table_second_9;

\c - postgres - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO WARNING;
CREATE SUBSCRIPTION sub2
        CONNECTION 'host=localhost port=57637 user=postgres dbname=regression'
        PUBLICATION pub2
               WITH (
                   create_slot=false,
                   enabled=true,
                   slot_name=:slot_for_second_owner,
                   copy_data=false);

-- expect data
\c - admin_user - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SELECT * FROM table_second_7;

SELECT wait_for_expected_rowcount_at_table('table_second_8', 1);
SELECT * FROM table_second_8;

SELECT wait_for_expected_rowcount_at_table('table_second_9', 1);
SELECT * FROM table_second_9;

\c - postgres - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
DROP PUBLICATION pub1;
DROP PUBLICATION pub2;

\c - postgres - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
DROP SUBSCRIPTION sub1;
DROP SUBSCRIPTION sub2;

\c - - - :master_port
-- Test Secneario
-- 1) Setup shared memory segment by calling worker_split_shard_replication_setup.
-- 2) Redo step 1 as the earlier memory. Redoing will trigger warning as earlier memory isn't released.
-- 3) Execute worker_split_shard_release_dsm to release the dynamic shared memory segment
-- 4) Redo step 1 and expect no warning as the earlier memory is cleanedup.

SELECT nodeid AS worker_1_node FROM pg_dist_node WHERE nodeport=:worker_1_port \gset
SELECT nodeid AS worker_2_node FROM pg_dist_node WHERE nodeport=:worker_2_port \gset

\c - - - :worker_1_port
SET search_path TO split_shard_replication_setup_schema;
SET client_min_messages TO ERROR;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
    ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
    ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
    ], 0);

SET client_min_messages TO WARNING;
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
    ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
    ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
    ], 0);

SELECT pg_catalog.worker_split_shard_release_dsm();
SELECT count(*) FROM pg_catalog.worker_split_shard_replication_setup(ARRAY[
    ROW(1, 'id', 2, '-2147483648', '-1', :worker_1_node)::pg_catalog.split_shard_info,
    ROW(1, 'id', 3, '0', '2147483647', :worker_1_node)::pg_catalog.split_shard_info
    ], 0);
SELECT pg_catalog.worker_split_shard_release_dsm();

-- cleanup, we are done with these manually created test tables
DROP TABLE table_to_split_1, table_to_split_2, table_to_split_3;

\c - - - :worker_2_port
SET search_path TO split_shard_replication_setup_schema;
DROP TABLE table_to_split_1, table_to_split_2, table_to_split_3;
\c - - - :master_port
CALL pg_catalog.citus_cleanup_orphaned_resources();
SET client_min_messages TO ERROR;
DROP SCHEMA split_shard_replication_setup_schema CASCADE;
DROP OWNED BY myuser;
DROP USER myuser;
DROP OWNED BY admin_user;
DROP USER admin_user;
